iT邦幫忙

2022 iThome 鐵人賽

DAY 13
2

Middlewares

Moleculer 框架也支援 Middlewares ,類似插入模組套件的概念。 Middlewares 是一個帶有 Hook 與包裝函數的 Object ,它允許包裝 Action 處理程序、事件處理程序、Broker 方法及 Hook 生命週期事件。

範例:

awesome.middleware.js

module.exports = {
    name: "Awesome",

    localAction(next, action) {
        return function(ctx) {
            console.log(`My middleware is called before the `${ctx.action.name}` action executed.`);
            return next(ctx);
        }
    }
};

moleculer.config.js

module.exports = {
    middlewares: true,
};

包裝處理程序

Hooks 中有一些包裝函數,你可以利用它包裝原始處理程序,然後返回一個新的函數。包裝 Hooks 的第一個參數會是 next

範例:以 localAction 為例,包裝本地處理程序

const MyDoSomethingMiddleware = {
    localAction(next, action) {
        // 假如功能被啟用則包裝它,否則直接返回原始處理程序
        if (action.myFunc) {
            // 包裝處理程序
            return function(ctx) {
                doSomethingBeforeHandler(ctx);

                return next(ctx)
                    .then(res => {
                        doSomethingAfterHandler(res);
                        // 返回原始結果
                        return res;
                    })
                    .catch(err => {
                        doSomethingAfterHandlerIfFailed(err);
                        // 拋出錯誤
                        throw err;
                    });
            }
        }
        return next;
    }
};

範例:參數驗證 middleware

localAction 中的 next 會是一個原始的處理程序,或者是經過包裝的處理程序。然而 middleware 也應該要返回一個原始的處理程序,或者是新的包裝處理程序。所以你可以發現範例中的 middleware 會確認 action.params 是否存在,如果存在才返回一個驗證模組的包裝處理程序。假如 action.params 不存在則返回原始的處理程序。

const MyValidator = {
    localAction(next, action) {
        // 假如 `action.params` 存在,則包裝參數驗證器
        if (_.isObject(action.params)) {
            return ctx => {
                this.validate(action.params, ctx.params);
                return next(ctx);
            };
        }
        return next;
    }
};

範例:快取 middleware

除了返回 next 以外,你也可以返回快取。例如當發現了請求資料中包含了快取,你可以直接取得快取內容來替代 next 。注意 next 返回的是一個 Promise

const MyCacher = {
    localAction(next, action) {
        return async function cacherMiddleware(ctx) {
            const cacheKey = this.getCacheKey(action.name, ctx.params, action.cache.keys);
            const content = await this.get(cacheKey);
            // 如果快取存在,直接返回快取內容
            if (content != null) {
                ctx.cachedResult = true;
                return content;
            }
            // 呼叫 next
            const result = await next(ctx);
            // 儲存快取內容
            this.set(cacheKey, result);
            return result;
        }.bind(this);
    }
};

裝飾核心模組 (擴充功能)

Middleware 函數可用於在 ServiceBrokerService 類別加入一個新的功能。

moleculer.config.js

module.exports = {
    middlewares: [
        {
            // Broker 建立後
            created(broker) {
                // 在 broker 加上客製的 allCall 功能函數,用來呼叫所有可用的節點
                broker.allCall = function (action, params, opts = {}) {
					// 取得所有節點 ID
                    const nodeIDs = this.registry.getNodeList({
                        onlyAvailable: true
                    }).map(node => node.id);
                    // 呼叫所有節點
                    return Promise.all(
                        nodeIDs.map(nodeID => broker.call(
                            action, params, Object.assign({ nodeID }, opts)
                        ))
                    );
                };
            }
        }
    ]
};

使用客製的 allCall 功能函數,呼叫所有節點的健康狀況:

const res = await broker.allCall("$node.health");

Hooks

官方提供非常多的 Hooks ,由於鐵人賽時間有限,僅簡單列出所有的 Hooks 使用方式。

my.middleware.js

module.export = {
    name: "MyMiddleware",
	// 本地 Action
    localAction(next, action) {
        return function (ctx) {
            // 變更 context 屬性或其它內容
            return next(ctx)
                .then(res => {
                    // 變更響應內容
                    return res;
                })
                .catch(err => {
                    // 錯誤處理或拋出異常
                    throw err;
                });
        };
    },
	// 遠端 Action
    remoteAction(next, action) {
        return function (ctx) {
            // 變更 context 屬性或其它內容
            return next(ctx)
                .then(res => {
                    // 變更響應內容
                    return res;
                })
                .catch(err => {
                    // 錯誤處理或拋出異常
                    throw err;
                });
        };
    },
	// 本地事件
    localEvent(next, event) {
        return (ctx) => {
            return next(ctx);
        };
    },
	// 本地方法
    localMethod(next, method) {
        return (...args) => {
            console.log(`The '${method.name}' method is called in '${method.service.fullName}' service.`, args);
            return next(...args);
        };
    },
	// broker.createService 的 Hook
    createService(next) {
        return function (schema, schemaMods) {
            console.log("The 'createService' is called.");
            return next(schema, schemaMods);
        };
    },
	// broker.destroyService 的 Hook
    destroyService(next) {
        return function (service) {
            console.log("The 'destroyService' is called.");
            return next(service);
        };
    },
	// broker.call 的 Hook
    call(next) {
        return function (actionName, params, opts) {
            console.log("The 'call' is called.", actionName);
            return next(actionName, params, opts).then(res => {
                console.log("Response:", res);
                return res;
            });
        };
    },
	// broker.mcall 的 Hook
    mcall(next) {
        return function () {
            console.log("The 'mcall' is called.");
            return next(...arguments).then(res => {
                console.log("Response:", res);
                return res;
            });
        };
    },
	// broker.emit 的 Hook
    emit(next) {
        return function (eventName, payload, opts) {
            console.log("The 'emit' is called.", eventName);
            return next(eventName, payload, opts);
        };
    },
	// broker.broadcast 的 Hook
    broadcast(next) {
        return function (eventName, payload, opts) {
            console.log("The 'broadcast' is called.", eventName);
            return next(eventName, payload, opts);
        };
    },
	// broker.broadcastLocal 的 Hook
    broadcastLocal(next) {
        return function (eventName, payload, opts) {
            console.log("The 'broadcastLocal' is called.", eventName);
            return next(eventName, payload, opts);
        };
    },
	// 服務建立後 (同步)
    serviceCreated(service) {
        console.log("Service created", service.fullName);
    },
	// 服務啟動前 (異步)
    serviceStarting(service) {
        console.log("Service is starting", service.fullName);
    },
	// 服務啟動後 (異步)
    serviceStarted(service) {
        console.log("Service started", service.fullName);
    },
	// 服務停止前 (異步)
    serviceStopping(service) {
        console.log("Service is stopping", service.fullName);
    },
	// 服務停止後 (異步)
    serviceStopped(service) {
        console.log("Service stopped", service.fullName);
    },
	// 服務註冊
    registerLocalService(next) {
        return (service) => {
            console.log("Registering a local service", service.name);
            return next(service);
        };
    },
	// 服務建立前,會在 Merged 完成後
    serviceCreating(service, schema) {
        // 變更 schema
        schema.myProp = "John";
    },
	// Transit 發送前
    transitPublish(next) {
        return (packet) => {
            return next(packet);
        };
    },
	// Transit 接收解析前
    transitMessageHandler(next) {
        return (cmd, packet) => {
            return next(cmd, packet);
        };
    },
	// Transporter 發送前
    transporterSend(next) {
        return (topic, data, meta) => {
            // 變更 data 內容,注意 data 是個 Buffer
            return next(topic, data, meta);
        };
    },
	// Transporter 接收後
    transporterReceive(next) {
        return (cmd, data, s) => {
            // 變更 data 內容,注意 data 是個 Buffer
            return next(cmd, data, s);
        };
    },
	// 建立 Log (同步)
    newLogEntry(type, args, bindings) {
        // 變更 `args` 內容
    },
	// 建立 Broker (異步)
    created(broker) {
        console.log("Broker created");
    },
	// 啟動 Broker 前 (異步)
    starting(broker) {
        console.log("Broker is starting");
    },
	// 啟動 Broker 後 (異步)
    started(broker) {
        console.log("Broker started");
    },
	// 停止 Broker 前 (異步)
    stopping(broker) {
        console.log("Broker is stopping");
    },
	// 停止 Broker 後 (異步)
    stopped(broker) {
        console.log("Broker stopped");
    },
};

更多詳細的使用方法請參考官方手冊:

https://moleculer.services/docs/0.14/middlewares.html#Hooks

內部 middlewares

Moleculer 的許多功能都有內部的 middlewares ,它會在 broker 建立時被自動載入。然而你也可以在 Broker 選項設定 internalMiddlewares: false 來關閉它,但在這種情況下你必須在 Broker 的 middlewares: [] 選項中,明確指定所需的 middlewares 。

清單:

Class name Type Description

名稱 類型 說明
ActionHook Optional Action hooks 處理程序
Validator Optional 參數驗證器
Bulkhead Optional Bulkhead 功能
Cacher Optional 快取
ContextTracker Optional Context tracker 功能
CircuitBreaker Optional Circuit Breaker 功能
Timeout Always Timeout 功能
Retry Always Retry 功能
Fallback Always Fallback 功能
ErrorHandler Always 錯誤處理
Tracing Optional 追蹤功能
Metrics Optional Metrics 功能
Debounce Optional Debounce 功能
Throttle Optional Throttle 功能
Transmit.Encryption Optional 加密傳輸
Transmit.Compression Optional 壓縮傳輸
Debugging.TransitLogger Optional Transit Logger
Debugging.ActionLogger Optional Action Logger

範例:

const { Bulkhead, Retry } = require("moleculer").Middlewares;

傳輸 middlewares

加密

使用 Node.js 內建的 crypto 加密函式庫,加入 AES middleware 來做加密,以增加通訊傳輸的安全性。

moleculer.config.js

const crypto = require("crypto");
const { Middlewares } = require("moleculer");
const initVector = crypto.randomBytes(16);

module.exports = {
    middlewares: [
        Middlewares.Transmit.Encryption(
            "secret-password", "aes-256-cbc", initVector
        )
    ]
};

壓縮

使用壓縮 middleware 可以減少 Transporter 傳輸的資料大小,內建是使用 Node.js 的 zlib 函式庫來處理,可以使用 deflatedeflateRawgzip

moleculer.config.js

const { Middlewares } = require("moleculer");

module.exports = {
    middlewares: [
        Middlewares.Transmit.Compression("deflate")
    ]
};

除錯 Middlewares

Transit logger

Transit logger middleware 讓我們很容易的追蹤服務與服務間的訊息交換。

moleculer.config.js

const { Middlewares } = require("moleculer");

module.exports = {
  middlewares: [
    Middlewares.Debugging.TransitLogger({
      logPacketData: false,
      folder: null,
      colors: {
        send: "magenta",
        receive: "blue"
      },
      packetFilter: ["HEARTBEAT"]
    })
  ]
};

參數:

名稱 類型 預設值 說明
logger <Object> | <Function> null Logger 類別
logLevel <String> info Log 等級
logPacketData <Boolean> false Logs packet 參數
folder <Object> null 輸出目錄
extension <String> .json Log 副檔名
color.receive <String> grey 接收顏色[2]
color.send <String> grey 送出顏色[2]
packetFilter <String[]> HEARTBEAT 要跳過的packet[3]

Action Logger

Action Logger middleware 是用來追蹤服務 Actions 是如何執行的。

moleculer.config.js

const { Middlewares } = require("moleculer");

module.exports = {
  middlewares: [
    Middlewares.Debugging.ActionLogger({
      logParams: true,
      logResponse: true,
      folder: null,
      colors: {
        send: "magenta",
        receive: "blue"
      },
      whitelist: ["**"]
    })
  ]
};
名稱 類型 預設值 說明
logger <Object> | <Function> null Logger 類別
logLevel <String> info Log 等級
logParams <Boolean> false 紀錄請求參數
logMeta <Boolean> false 紀錄 meta 參數
folder <Object> null 輸出目錄
extension <String> .json Log 副檔名
color.request <String> yellow 請求顏色[2]
color.response <String> cyan 響應顏色[2]
colors.error <String> red 錯誤顏色[2]
whitelist <String[]> ["**"] 要記錄的 Actions 名稱,可使用正則表達式或萬用字元

事件執行率

節流

節流可以用來降低事件的觸發頻率。由於它會以固定的速率去觸發監聽器,因此監聽器會忽略事件的部分資訊。它使用與 Lodash 的 _.throttle 相同的方法[4] 。

my.service.js

module.exports = {
    name: "my",
    events: {
        "config.changed": {
            throttle: 3000,
            // 3 秒內不會再次呼叫
            handler(ctx) { /* ... */}
        }
    }
};

防抖

防抖不同於節流,當事件不斷的觸發時它不會呼叫,直到事件停止觸發後等待一段時間才會呼叫處理程序。它使用與 Lodash 的 _.debounce 相同的方法[5] 。

my.service.js

module.exports = {
    name: "my",
    events: {
        "config.changed": {
            debounce: 5000,
            // Handler will be invoked when events are not received in 5 seconds.
            handler(ctx) { /* ... */}
        }
    }
};

關於節流與防抖的差異,官方手冊推薦可以看 David Corbacho 的文章[6]。

Loading & Extending

客製化函數可以直接在 Middlewares 擴充,然後就可以像內建的 middlewares 函數一樣可以在 middlewares[] 直接加入使用。

moleculer.config.js

const { Middlewares } = require("moleculer");

// 建立一個客製化擴充 middleware
Middlewares.MyCustom = {
    created(broker) {
        broker.logger.info("My custom middleware is created!");
    }
};

module.exports = {
    logger: true,
    middlewares: [
        // 直接使用客製化 middleware
        "MyCustom"
    ]
};

內部 middleware 全景圖


Fig. 1. Moleculer middleware

參考文獻

[1] Middlewares, https://moleculer.services/docs/0.14/middlewares.html
[2] Chalk colors, https://github.com/chalk/chalk#colors
[3] Protocol 4.0 (rev. 1), https://github.com/moleculer-framework/protocol/blob/master/4.0/PROTOCOL.md
[4] Lodash throttle, https://lodash.com/docs/4.17.15#throttle
[5] Lodash debounce, https://lodash.com/docs/4.17.15#debounce
[6] David Corbacho, Debouncing and Throttling Explained Through Examples, https://css-tricks.com/debouncing-throttling-explained-examples/

家家酒小劇場

  • Otter - 今天的內容好多阿
  • Boxy - 內容看起來比較多,但是使用方法都大同小異,有需要用到的時候再來查詢囉

上一篇
Day 12 : Logging
下一篇
Day 14 : 網路連結
系列文
Moleculer 家家酒31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
json_liang
iT邦研究生 5 級 ‧ 2022-09-13 10:21:33

middleware 真的是一個很重要的功能!感謝分享

我要留言

立即登入留言